#coding:utf8
import time

from pyspark.sql import *
import os
from pyspark.sql.types import *
import pandas as pd
import pyspark.sql.functions as f
from pyspark import *

if __name__ == '__main__':
    spark=SparkSession.builder.appName("test11_wordcount_demo")\
        .master("local[*]")\
        .config("spark.sql.shuffle.partitions",2) \
        .getOrCreate()
    """
    spark.sql.shuffle.partitions 参数指的是，在sql计算中，shuffle算子阶段默认的分区数是200个
    对于集群模式来说，200也不算多
    这个参数和spark rdd中设置并行度的参数是相互独立的
    """
    sc=spark.sparkContext
    # sc.addJar("/home/kevin/export/server/anaconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/jars/mysql-connector-java-5.1.27-bin.jar")
    localhost_path = "file://" + os.getcwd() + "/../data/input/sql/u.data"
    # # localhost_path =os.getcwd()+"/../data/input/sql/u.data"
    schema = StructType().add("user_id", StringType(), True) \
        .add("movie_id", IntegerType(), True) \
        .add("rank", IntegerType(), True) \
        .add("ts", StringType(), True)
    df = spark.read.csv(localhost_path, schema, header=False, sep='\t', encoding="utf-8")
    # df.show()

    """
    JDBC写出，会自动创建表的，
    因为DataFrame中有表结构信息，StructType记录的各个字段的名称 类型 是否可空
    """
    # df.write.jdbc("jdbc:mysql://hadoop3-02:3306/test1_pymysql"
    #           ,"movie_data"
    #           ,"overwrite"
    #           ,{ "user" : "root" , "password" : "000000" }
    #           )
    df1=spark.read.jdbc("jdbc:mysql://hadoop3-02:3306/test1_pymysql"
              ,"movie_data"
              # ,["user_id","ts"]
              ,properties={ "user" : "root" , "password" : "000000" })

    """
    打印df中的数据
    参数1 表示打印出来多少条数据，默认20
    参数2 表示是否对列进行截断，如果列的数据长度超过20个字符串长度，后续内容不显示以...代替
    如果给False 表示不截断全部显示，默认是True
    """
    df1.show(30,False)

    # # 1. 写出df到mysql数据库中
    # df.write.mode("overwrite").\
    #     format("jdbc").\
    #     option("url", "jdbc:mysql://hadoop3-02:3306/test1_pymysql").\
    #     option("dbtable", "movie_data").\
    #     option("user", "root").\
    #     option("password", "000000").\
    #     save()

    # df2 = spark.read.format("jdbc"). \
    #     option("url", "jdbc:mysql://hadoop3-02:3306/test1_pymysql?useSSL=false&useUnicode=true").\
    #     option("dbtable", "student"). \
    #     option("user", "root"). \
    #     option("password", "000000"). \
    #     load()
    # df2.printSchema()
    # df2.show()

    spark.stop





